package cn.rdtimes.wolfdsp.core.job;


import cn.rdtimes.wolfdsp.core.conf.Configuration;
import cn.rdtimes.wolfdsp.core.counter.PrefCounter;
import cn.rdtimes.wolfdsp.core.data.*;
import cn.rdtimes.wolfdsp.core.ha.StateSyncService;
import cn.rdtimes.wolfdsp.core.ids.IdsGenerator;
import cn.rdtimes.wolfdsp.core.invoker.AbstractTaskInvokeHandler;
import cn.rdtimes.wolfdsp.core.invoker.TaskHttpInvokeHandler;
import cn.rdtimes.wolfdsp.core.invoker.TaskInvokeHandler;
import cn.rdtimes.wolfdsp.core.invoker.TaskInvokeHandlerFactory;
import cn.rdtimes.wolfdsp.core.invoker.http.HttpInvokeObject;
import cn.rdtimes.wolfdsp.core.invoker.http.ResponseMessage;
import cn.rdtimes.wolfdsp.core.service.ScheduleManager;
import cn.rdtimes.wolfdsp.core.util.ResponseUtil;
import cn.rdtimes.wolfdsp.core.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

/**
 * 流程服务实现
 * 实现思路:
 * JobDefinition-->ScheduledInstance-->JobGraphInstance-->TaskInstance
 * <p>
 * 注意
 * 这里分布式调用采用的http方式和内部异步处理模式,可能会产生消息顺序混乱的情况
 *
 * @author BZ
 */
public class JobGraphServiceImpl implements JobGraphService {
    private static final Logger logger = LoggerFactory.getLogger(JobGraphServiceImpl.class);

    /**
     * 对调用实例的计数器
     */
    private final ScheduleCounters counters = new ScheduleCounters();

    /**
     * Object为两种类型对象,应该是Object或一个是PerformScheduledInstance
     * 需要调度的调度实例
     */
    private final BlockingQueue<Object> pendingPerformScheduleInstanceQueue;

    /**
     * 用于队列结束等待
     */
    private final Object QUEUE_NULL = new Object();

    /**
     * 流程实例, key-jobId, value-PerformJobGraphInstance
     */
    private final Map<String, PerformJobGraphInstance> schedulingJobInstanceMap;

    /**
     * 调度实例, key-taskId, value-PerformTaskInstance
     * 这里只是需要单独运行的task列表
     */
    private final Map<String, PerformTaskInstance> schedulingTaskInstanceMap;

    /**
     * 调度执行器, 用于调度实例
     */
    private ScheduledExecutorService scheduledService;

    /**
     * 任务实例执行线程池
     */
    private ExecutorService taskThreadPool;

    public JobGraphServiceImpl() {
        this.pendingPerformScheduleInstanceQueue = new LinkedBlockingQueue<>();
        this.schedulingJobInstanceMap = new ConcurrentHashMap<>(64);
        this.schedulingTaskInstanceMap = new ConcurrentHashMap<>(64);
    }

    /**
     * 启动标志
     */
    private volatile boolean isStarted;

    /**
     * 队列执行器
     */
    private PendingThread pendingThread;

    /**
     * 动态查询已经运行的任务实例状态执行器
     */
    private WatchThread watchThread;

    @Override
    public void start() {
        if (isStarted) return;

        isStarted = true;
        Configuration conf = ScheduleManager.getInstance().getConfiguration();
        scheduledService = Executors.newScheduledThreadPool(conf.getScheduleThreadCount());
        taskThreadPool = Executors.newScheduledThreadPool(conf.getScheduleThreadCount());
        pendingThread = new PendingThread();
        watchThread = new WatchThread(conf.getTaskQueryInterval());
        pendingThread.start();
        watchThread.start();
    }

    @Override
    public void shutdown() {
        if (!isStarted) return;

        isStarted = false;
        pendingThread.shutdown();
        watchThread.shutdown();

        shutdownJobGraphInstance();

        scheduledService.shutdown();
        taskThreadPool.shutdown();
        try {
            scheduledService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
        } catch (Exception e) {
            // ignore
        }
        try {
            taskThreadPool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
        } catch (Exception e) {
            // ignore
        }
        pendingPerformScheduleInstanceQueue.clear();
        schedulingTaskInstanceMap.clear();
        schedulingJobInstanceMap.clear();
    }

    private void shutdownJobGraphInstance() {
        // 转换成数组,避免因为集合删除元素抛并行修改异常
        PerformJobGraphInstance[] pjInstance = schedulingJobInstanceMap.values().toArray(new PerformJobGraphInstance[0]);
        for (PerformJobGraphInstance inst : pjInstance) {
            try {
                killJobInstance(inst.getInstanceId(), inst.getJobId());
            } catch (Exception e) {
                logger.error("shutdownJobGraphInstance", e);
            }
        }
    }

    @Override
    public PrefCounter[] getPrefCounter() {
        return counters.getPrefCounter();
    }

    @Override
    public void schedule(String jobId) throws Exception {
        checkMasterServer();
        checkParams(jobId);

        if (schedulingJobInstanceMap.containsKey(jobId)) {
            return;
        }

        //先加入准备调度队列，这个可能存在时差
        DataService dataService = ScheduleManager.getInstance().getDataService();
        JobGraphDefinition jobGraphDefinition = dataService.getDefinitionService().loadJobGraphDefinition(jobId);
        ScheduledInstance scheduledInstance = dataService.getDefinitionService().getScheduledInstance(jobGraphDefinition);
        pendingPerformScheduleInstanceQueue.add(new PerformScheduledInstance(scheduledInstance));
    }

    private CompletedListener completedListener;

    @Override
    public void scheduleContinue(String jobId) throws Exception {
        checkMasterServer();
        checkParams(jobId);

        if (schedulingJobInstanceMap.containsKey(jobId)) {
            return;
        }

        DataService dataService = ScheduleManager.getInstance().getDataService();
        DataInstanceService.JobGraphInstanceIds jobGraphInstanceIds = dataService.getInstanceService()
                .loadJobGraphInstanceIdAndRunning(jobId);
        if (jobGraphInstanceIds == null) {
            return;
        }

        jobInstanceContinue(jobGraphInstanceIds.instanceId, jobGraphInstanceIds.jobId);
    }

    @Override
    public JobGraphState jobInstanceContinue(String instanceId, String jobId) throws Exception {
        checkMasterServer();
        checkParams(instanceId, jobId);

        PerformJobGraphInstance pInstance = schedulingJobInstanceMap.get(jobId);
        if (pInstance != null) {
            return pInstance.getJobGraphInstance().getJobGraphState();
        }

        DataService dataService = ScheduleManager.getInstance().getDataService();
        JobGraphInstance jobGraphInstance = dataService.getInstanceService().loadJobGraphInstance(instanceId, jobId);
        // 判断状态和是否能继续运行
        if (!jobGraphInstance.isEnabledContinue()) {
            throw new IllegalStateException("jobGraphInstance is not repeat");
        }
        JobGraphState status = jobGraphInstance.getJobGraphState();
        if (status == JobGraphState.FINISHED) {
            return JobGraphState.FINISHED;
        }

        jobGraphInstance.clean();
        jobGraphInstance.setStartTime(System.currentTimeMillis());
        jobGraphInstance.setJobGraphState(JobGraphState.RUNNING);
        DataInstanceService dataInstanceService = ScheduleManager.getInstance().getDataService().getInstanceService();
        int ret = dataInstanceService.updateJobGraphInstance(jobGraphInstance.getInstanceId(), jobId,
                jobGraphInstance.getJobGraphState(), jobGraphInstance.getCompletedCount(),
                jobGraphInstance.getFailCount(), jobGraphInstance.getSkipCount(), jobGraphInstance.getStartTime(),
                jobGraphInstance.getEndTime(), jobGraphInstance.getProgress(), jobGraphInstance.getExceptionInfo());
        if (ret <= 0) {
            throw new Exception("updateJobGraphInstanceState is error");
        }

        boolean isCompleted = false;
        Throwable ex = null;
        pInstance = new PerformJobGraphInstance(jobGraphInstance);
        try {
            schedulingJobInstanceMap.put(jobId, pInstance);

            loadTaskInstance(pInstance);
            runTaskInstanceFormJobInstance(pInstance);
            // 判断任务是否都完成了
            isCompleted = pInstance.addCompletedTask(null, false);
            if (!isCompleted) {
                ret = dataInstanceService.updateJobGraphInstanceProcess(jobGraphInstance.getInstanceId(), jobId,
                        pInstance.getProgress(), jobGraphInstance.getCompletedCount(), jobGraphInstance.getFailCount(),
                        jobGraphInstance.getSkipCount());
                if (ret <= 0) {
                    logger.error("updateJobGraphInstanceProgress is error");
                }
            }
        } catch (Exception e) {
            isCompleted = true;
            ex = e;
            jobGraphInstance.setJobGraphState(JobGraphState.FAILED);
        } finally {
            if (isCompleted) {
                schedulingJobInstanceMap.remove(jobId);

                updateJobGraphInstanceCompleted(pInstance.getJobGraphInstance(),
                        jobGraphInstance.getJobGraphState(), ex == null ? null : ex.getMessage());

                if (completedListener != null) {
                    completedListener.onJobGraphCompleted(jobId, pInstance.getInstanceId(),
                            jobGraphInstance.getJobGraphState(), ex);
                }
            }
        }

        return jobGraphInstance.getJobGraphState();
    }

    /**
     * 装载任务并转换成任务实例
     * 构建一个DAG图
     *
     * @param pInstance 任务流程
     * @throws Exception 异常
     */
    private void loadTaskInstance(PerformJobGraphInstance pInstance) throws Exception {
        DataService dataService = ScheduleManager.getInstance().getDataService();
        List<TaskInstance> taskInstances = dataService.getInstanceService().loadTaskInstance(pInstance.getInstanceId(),
                pInstance.getJobId());
        if (taskInstances == null || taskInstances.size() == 0) {
            throw new IllegalStateException("not found any taskInstance");
        }
        builderDAG(pInstance, taskInstances);
    }

    private void builderDAG(PerformJobGraphInstance pJobInstance, List<TaskInstance> taskInstances) {
        // 将所有任务都放到map中和流程实例中
        final Map<String, PerformTaskInstance> allTasksMap = new HashMap<>(taskInstances.size());
        for (TaskInstance taskInstance : taskInstances) {
            PerformTaskInstance pTaskInstance = new PerformTaskInstance(pJobInstance, taskInstance);
            allTasksMap.put(pTaskInstance.getTaskId(), pTaskInstance);
            pJobInstance.addTask(pTaskInstance);
        }

        // 构建拓扑图
        for (TaskInstance taskInstance : taskInstances) {
            PerformTaskInstance pTaskInstance = allTasksMap.get(taskInstance.getTaskId());
            // 直接放到job 中
            if (StringUtil.isEmpty(taskInstance.getPreTaskIds())) {
                pJobInstance.addChildTask(pTaskInstance);
            } else {// 需要找到对应的任务并添加到下面
                String[] preTaskIds = taskInstance.getPreTaskIds().split(",");
                for (String id : preTaskIds) {
                    PerformTaskInstance preInstance = allTasksMap.get(id);
                    preInstance.addChildTask(pTaskInstance);
                }
            }
        }
        allTasksMap.clear();
    }

    /**
     * 运行任务实例来自继续执行流程实例
     * 这里要判断任务实例的状态,只有完成状态跳过,其他需要重新运行并修改其状态
     *
     * @param pjInstance 任务流程
     */
    private void runTaskInstanceFormJobInstance(PerformJobGraphInstance pjInstance) {
        List<PerformTaskInstance> finishedList = new ArrayList<>();
        PerformTaskInstance[] childTasks = pjInstance.getChildTask();
        for (PerformTaskInstance ptInstance : childTasks) {
            TaskInstance inst = ptInstance.getTaskInstance();
            if (inst.getTaskState() == TaskState.FINISHED) {
                finishedList.add(ptInstance);
                pjInstance.addCompletedTask(ptInstance, false);
                continue;
            }

            inst.clean();
            inst.setStartTime(System.currentTimeMillis());
            inst.setTaskState(TaskState.PENDING);
            // 整合任务实例输入参数
            ptInstance.mergeInputParams(pjInstance.getInputParams());
            addTaskInstanceToThreadPool(ptInstance);
        }
        // 应该需要递归调用了,从根的直接孩子开始遍历
        if (finishedList.size() > 0) {
            for (PerformTaskInstance ptInstance : finishedList) {
                runTaskInstanceFormJobInstance(pjInstance, ptInstance);
            }
        }
    }

    /**
     * 要递归查看其子任务是否可运行,如果能运行需要合并输入参数
     *
     * @param pjInstance 任务流程
     * @param ptInstance 任务
     */
    private void runTaskInstanceFormJobInstance(PerformJobGraphInstance pjInstance, PerformTaskInstance ptInstance) {
        // 1.如果没有孩子就返回
        if (ptInstance.getChildTaskCount() == 0) return;

        // 2.有孩子情况
        PerformTaskInstance[] childTasks = ptInstance.getChildTask();
        for (PerformTaskInstance childInstance : childTasks) {
            // 判断孩子的状态是否为完成
            TaskState state = childInstance.getTaskInstance().getTaskState();
            if (state == TaskState.FINISHED) {// 完成状态要递归调用
                pjInstance.addCompletedTask(childInstance, false);
                runTaskInstanceFormJobInstance(pjInstance, childInstance);
            } else {
                // 将前置任务的输出参数整合到输入参数
                childInstance.mergeInputParams(ptInstance.getOutputParams());
                // 增加孩子节点的前置任务的完成情况
                boolean isRunChild = childInstance.incPreTaskCountIfRunChild();
                // 所有前置任务实例都完成
                if (isRunChild) {
                    childInstance.getTaskInstance().clean();
                    childInstance.getTaskInstance().setStartTime(System.currentTimeMillis());
                    childInstance.getTaskInstance().setTaskState(TaskState.PENDING);
                    // 整合流程实例参数到任务实例输入参数
                    childInstance.mergeInputParams(pjInstance.getInputParams());
                    addTaskInstanceToThreadPool(childInstance);
                }
            }
        }
    }

    private void addTaskInstanceToThreadPool(PerformTaskInstance ptInstance) {
        DataService dataService = ScheduleManager.getInstance().getDataService();
        TaskInstance inst = ptInstance.getTaskInstance();
        try {
            dataService.getInstanceService().updateTaskInstanceState(inst.getInstanceId(),
                    inst.getTaskId(), inst.getTaskState(), inst.getProgress(), inst.getStartTime(),
                    inst.getEndTime(), inst.getExceptionInfo());
        } catch (Exception e) {
            logger.error("addTaskInstanceToThreadPool", e);
        }
        Future<?> future = taskThreadPool.submit(new TaskInstanceTask(ptInstance, false));
        ptInstance.setFuture(future);
    }


    @Override
    public JobGraphState killJobInstance(String instanceId, String jobId) throws Exception {
        checkMasterServer();
        checkParams(instanceId, jobId);

        PerformJobGraphInstance instance = schedulingJobInstanceMap.remove(jobId);
        if (instance == null) {
            throw new IllegalStateException("PerformJobGraphInstance is not exist");
        }

        JobGraphState state = JobGraphState.KILLED;
        instance.getJobGraphInstance().setJobGraphState(state);
        killTaskInstance(instance);
        updateJobGraphInstanceCompleted(instance.getJobGraphInstance(), state, null);
        if (completedListener != null) {
            completedListener.onJobGraphCompleted(jobId, instanceId, state, null);
        }
        return state;
    }

    /**
     * 停止所有任务实例,发送停止消息
     * 返回的响应状态都按KILL状态处理
     *
     * @param pjInstance 任务流程
     * @throws Exception 异常
     */
    @SuppressWarnings("unchecked")
    private void killTaskInstance(PerformJobGraphInstance pjInstance) throws Exception {
        DataService dataService = ScheduleManager.getInstance().getDataService();
        PerformTaskInstance[] values = pjInstance.getAllTasks();
        for (PerformTaskInstance ptInstance : values) {
            TaskState state = ptInstance.getTaskInstance().getTaskState();
            if (state.getValue() > TaskState.RUNNING.getValue()) {
                continue;
            }
            Future<?> future = ptInstance.getFuture();
            if (future != null)
                future.cancel(true);
            if (ptInstance.getTaskInstance().getInvokeKind() != TaskInvokeHandler.InvokeKind.HTTP)
                continue;
            if (StringUtil.isEmpty(ptInstance.getNodeIp()))
                continue;

            int ret;
            Map<String, Object> resp;
            TaskHttpInvokeHandler<?> handler = (TaskHttpInvokeHandler<?>) getTaskInvokeHandler(
                    TaskInvokeHandler.InvokeKind.HTTP);
            HttpInvokeObject invokeObject = new HttpInvokeObject();
            invokeObject.set(ptInstance.getTaskInstance());
            invokeObject.setInputParam(ptInstance.getInputParams());
            invokeObject.setIp(ptInstance.getNodeIp());
            invokeObject.setPort(ptInstance.getNodePort());
            try {
                resp = (Map<String, Object>) handler.invokeStop(invokeObject);
            } catch (Exception e) {
                logger.error("killTaskInstance", e);
                resp = null;
            }
            state = TaskState.KILLED;
            ptInstance.setTimestamp(System.currentTimeMillis());
            ptInstance.getTaskInstance().setTaskState(state);
            ptInstance.getTaskInstance().setProgress(100);
            ptInstance.getTaskInstance().setEndTime(System.currentTimeMillis());
            if (resp != null) {
                state = TaskState.of(ResponseUtil.getState(resp, -1));
                if (state.getValue() < TaskState.SKIP.getValue()) {
                    state = TaskState.KILLED;
                }
                ptInstance.getTaskInstance().setExceptionInfo(ResponseUtil.getException(resp, null));
                ptInstance.getTaskInstance().setOutputParamJson(ResponseUtil.getOutputParams(resp, null));
                ptInstance.mergeOutputParams(ptInstance.getTaskInstance().getOutputParamJson());
            }
            if (state == TaskState.FINISHED) {
                ret = dataService.getInstanceService().updateJobGraphInstanceParam(ptInstance.getInstanceId(),
                        ptInstance.getInstanceId(), ptInstance.getOutputParams().toJson());
                if (ret <= 0) {
                    logger.error("updateJobGraphInstanceParam is error");
                }
            }

            ret = dataService.getInstanceService().updateTaskInstanceState(ptInstance.getInstanceId(),
                    ptInstance.getTaskId(), ptInstance.getTaskInstance().getTaskState(),
                    ptInstance.getTaskInstance().getProgress(),
                    ptInstance.getTaskInstance().getStartTime(),
                    ptInstance.getTaskInstance().getEndTime(),
                    ptInstance.getTaskInstance().getExceptionInfo());
            if (ret <= 0) {
                logger.error("updateTaskInstanceState is error");
            }
        }
    }

    @Override
    public TaskState againRunTask(String instanceId, String jobId, String taskId) throws Exception {
        checkMasterServer();
        checkParams(instanceId, jobId, taskId);

        PerformTaskInstance ptInstance = schedulingTaskInstanceMap.remove(taskId);
        if (ptInstance != null) {
            throw new IllegalStateException("PerformTaskInstance is exist");
        }

        // 装载实例数据,这里只是单独运行,不管前置任务是否存在
        DataService dataService = ScheduleManager.getInstance().getDataService();
        TaskInstance taskInstance = dataService.getInstanceService().loadTaskInstance(instanceId, jobId, taskId);
        if (!taskInstance.isEnabledRepeat()) {
            throw new IllegalStateException("PerformTaskInstance is exist");
        }
        if (taskInstance.getTaskState() == TaskState.FINISHED) {
            throw new IllegalStateException("PerformTaskInstance status is finish");
        }

        taskInstance.clean();
        taskInstance.setTaskState(TaskState.PENDING);
        taskInstance.setStartTime(System.currentTimeMillis());
        //更新数据存储
        int ret = dataService.getInstanceService().updateTaskInstanceState(instanceId, taskId,
                taskInstance.getTaskState(), taskInstance.getProgress(), taskInstance.getStartTime(),
                taskInstance.getEndTime(), taskInstance.getExceptionInfo());
        if (ret <= 0) {
            throw new IllegalStateException("updateTaskInstanceState is error");
        }

        PerformTaskInstance pInstance = new PerformTaskInstance(taskInstance);
        schedulingTaskInstanceMap.put(taskId, pInstance);
        Future<?> future = taskThreadPool.submit(new TaskInstanceTask(pInstance, true));
        pInstance.setFuture(future);
        return taskInstance.getTaskState();
    }

    @Override
    public ResponseMessage<?> taskReportRequest(Map<String, Object> request) {
        if (!ScheduleManager.getInstance().getPlatformHAStrategy().enabledPerform()) {
            return ResponseMessage.error(ResponseMessage.NOT_MASTER_SERVER, "not master server", null);
        }

        try {
            String jobId = ResponseUtil.getJobId(request, "");
            PerformJobGraphInstance pjInstance = schedulingJobInstanceMap.get(jobId);
            if (pjInstance == null) {
                throw new Exception("not found jobId=" + jobId + "PerformJobGraphInstance");
            }
            String taskId = ResponseUtil.getTaskId(request, "");
            PerformTaskInstance ptInstance = pjInstance.findPerformTaskInstance(taskId);
            if (ptInstance == null) {
                throw new Exception("not found taskId=" + taskId + "PerformTaskInstance");
            }
            TaskState taskState = TaskState.of(ResponseUtil.getState(request, -1));
            ptInstance.setTimestamp(System.currentTimeMillis());
            ptInstance.getTaskInstance().setTaskState(taskState);
            ptInstance.getTaskInstance().setProgress(ResponseUtil.getCurProgress(request, 0));
            ptInstance.getTaskInstance().setExceptionInfo(ResponseUtil.getOutputParams(request, null));
            ptInstance.getTaskInstance().setOutputParamJson(ResponseUtil.getOutputParams(request, null));
            ptInstance.mergeOutputParams(ptInstance.getTaskInstance().getOutputParamJson());

            processNodeResponse(ptInstance, false);
        } catch (Exception e) {
            logger.error("taskReportRequest", e);
        }
        return ResponseMessage.success();
    }

    @Override
    public void setCompletedListener(CompletedListener listener) {
        this.completedListener = listener;
    }

    @Override
    public CompletedListener getCompletedListener() {
        return completedListener;
    }

    /**
     * 运行任务实例,任务实例的
     * 这里的任务实例状态无需判断是否为完成或异常的状态,直接调用节点并判断返回状态
     * 任务实例的输入参数要在之前合并完成
     */
    class TaskInstanceTask implements Runnable {
        private final PerformTaskInstance ptInstance;

        /**
         * true表示只运行一个任务实例,不做其他处理
         * 用来支持单独重新运行任务实例情况
         */
        private final boolean onlyOne;

        public TaskInstanceTask(PerformTaskInstance ptInstance, boolean onlyOne) {
            this.ptInstance = ptInstance;
            this.onlyOne = onlyOne;
        }

        @SuppressWarnings("all")
        private void execTask() {
            DataService dataService = ScheduleManager.getInstance().getDataService();
            // 记录异常的深度
            int exceptionDepth = 0;
            try {
                // 存储任务实例输入参数
                int ret = dataService.getInstanceService().updateTaskInstanceState(ptInstance.getInstanceId(),
                        ptInstance.getTaskId(), ptInstance.getInputParams().toJson(), true);
                if (ret <= 0) {
                    throw new IllegalStateException("updateTaskInstanceState is error");
                }

                // 调用任务实例,查看返回结果
                TaskInvokeHandler.InvokeKind invokeKind = ptInstance.getTaskInstance().getInvokeKind();
                TaskInvokeHandler handler = getTaskInvokeHandler(invokeKind);
                if (invokeKind == TaskInvokeHandler.InvokeKind.HTTP) {
                    try {
                        invokeNodeHttp((TaskHttpInvokeHandler<?>) handler);
                    } catch (Exception e2) {
                        exceptionDepth = 1;
                        throw e2;
                    }
                } else {// 本地jar或shell
                    try {
                        AbstractTaskInvokeHandler.LocalResponse resp =
                                (AbstractTaskInvokeHandler.LocalResponse) handler.invoke(
                                        ptInstance.getTaskInstance().getCommandLine());
                        if (resp.exitCode == 0) { // 成功
                            ptInstance.getTaskInstance().setTaskState(TaskState.FINISHED);
                        } else {// 失败
                            ptInstance.getTaskInstance().setTaskState(TaskState.FAILED);
                        }
                        ptInstance.setTimestamp(System.currentTimeMillis());
                        ptInstance.getTaskInstance().setProgress(100);
                        ptInstance.getTaskInstance().setExceptionInfo(resp.exceptionInfo);
                        ptInstance.getTaskInstance().setEndTime(System.currentTimeMillis());
                    } catch (Exception e3) {
                        exceptionDepth = 1;
                        throw e3;
                    }
                }
                exceptionDepth = processNodeResponse(ptInstance, onlyOne);
            } catch (Exception e) {
                taskFailProcess(exceptionDepth, e);
            }
        }

        // 调用节点http请求,失败默认重试3次
        @SuppressWarnings("all")
        private void invokeNodeHttp(TaskHttpInvokeHandler<?> handler) throws Exception {
            Exception ex = null;
            Map<String, Object> resp = null;
            int i = 3;

            HttpInvokeObject invokeObject = new HttpInvokeObject();
            invokeObject.set(ptInstance.getTaskInstance());
            invokeObject.setInputParam(ptInstance.getInputParams());
            while (1 > 0) {
                try {
                    resp = (Map<String, Object>) handler.invoke(invokeObject);
                    ex = null;
                    break;
                } catch (Exception e) {
                    ex = e;
                    i--;
                    Thread.sleep(100);
                }
            }
            if (ex != null) throw ex;

            ptInstance.setNodeIp(ResponseUtil.getNodeIp(resp, null));
            ptInstance.setNodePort(ResponseUtil.getNodePort(resp, -1));
            TaskState state = TaskState.of(ResponseUtil.getState(resp, -1));
            ptInstance.setTimestamp(System.currentTimeMillis());
            ptInstance.getTaskInstance().setTaskState(state);
            ptInstance.getTaskInstance().setProgress(ResponseUtil.getCurProgress(resp, 0));
            ptInstance.getTaskInstance().setExceptionInfo(ResponseUtil.getException(resp, null));
            ptInstance.getTaskInstance().setOutputParamJson(ResponseUtil.getOutputParams(resp, null));
            ptInstance.mergeOutputParams(ptInstance.getTaskInstance().getOutputParamJson());
        }

        // 如果任务实例是可以自动跳过就继续孩子处理
        private void taskFailProcess(int exceptionDepth, Throwable ex) {
            DataService dataService = ScheduleManager.getInstance().getDataService();
            // 更新TaskInstance异常状态,这里是没有达到完成状态的
            if (exceptionDepth < 3) {
                if (onlyOne) {
                    schedulingTaskInstanceMap.remove(ptInstance.getTaskId());
                }
                try {
                    ptInstance.getTaskInstance().setEndTime(System.currentTimeMillis());
                    ptInstance.getTaskInstance().setTaskState(TaskState.FAILED);
                    ptInstance.getTaskInstance().setExceptionInfo(ex.getMessage());
                    dataService.getInstanceService().updateTaskInstanceState(ptInstance.getInstanceId(),
                            ptInstance.getTaskId(), ptInstance.getTaskInstance().getTaskState(), 100,
                            ptInstance.getTaskInstance().getStartTime(),
                            ptInstance.getTaskInstance().getEndTime(), ex.getMessage());
                } catch (Exception e) {
                    logger.error("taskFailProcess", e);
                }
            }
            if (!onlyOne) {
                PerformJobGraphInstance pjInstance = ptInstance.getPerformJobGraphInstance();
                // 继续运行孩子
                if (ptInstance.getTaskInstance().isAutoSkip())
                    runChildTasks(pjInstance, ptInstance, true);
                else {
                    // 更新JobGraphInstance状态,停止运行
                    schedulingJobInstanceMap.remove(pjInstance.getJobId());

                    try {
                        pjInstance.getJobGraphInstance().setJobGraphState(JobGraphState.FAILED);
                        pjInstance.getJobGraphInstance().setEndTime(System.currentTimeMillis());
                        dataService.getInstanceService().updateJobGraphInstance(pjInstance.getInstanceId(),
                                pjInstance.getJobId(), pjInstance.getJobGraphInstance().getJobGraphState(),
                                pjInstance.getJobGraphInstance().getCompletedCount(),
                                pjInstance.getJobGraphInstance().getFailCount(),
                                pjInstance.getJobGraphInstance().getSkipCount(),
                                pjInstance.getJobGraphInstance().getStartTime(),
                                pjInstance.getJobGraphInstance().getEndTime(),
                                100, ex.getMessage());
                    } catch (Exception e) {
                        logger.error("taskFailProcess", e);
                    }
                }
            }
        }

        @Override
        public void run() {
            execTask();
        }
    }

    /**
     * 处理调用节点响应返回的消息, 并判断是否需要运行孩子任务
     *
     * @param ptInstance 任务实例
     * @param onlyOne    是否单独运行实例,true-是
     * @return 返回异常深度, 从1开始
     */
    private int processNodeResponse(PerformTaskInstance ptInstance, boolean onlyOne) {
        DataService dataService = ScheduleManager.getInstance().getDataService();
        PerformJobGraphInstance pjInstance = null;
        int exceptionDepth = 1;
        int ret;
        TaskState taskState = ptInstance.getTaskInstance().getTaskState();
        try {
            // 存储流程实例的参数
            if (!onlyOne) {
                pjInstance = ptInstance.getPerformJobGraphInstance();
                if (pjInstance.mergeJobGraphParams(ptInstance.getOutputParams())) {
                    ret = dataService.getInstanceService().updateJobGraphInstanceParam(pjInstance.getInstanceId(),
                            pjInstance.getJobId(), pjInstance.getInputParams().toJson());
                    if (ret <= 0)
                        throw new IllegalStateException("updateJobGraphInstanceParam is error");
                }
            }
            // 根据返回状态进行不同处理
            boolean isProcessChild = false;
            if (taskState.getValue() <= TaskState.RUNNING.getValue()) {// 运行中
                exceptionDepth = 2;
                ret = dataService.getInstanceService().updateTaskInstanceState(ptInstance.getInstanceId(),
                        ptInstance.getTaskId(), taskState, ptInstance.getTaskInstance().getProgress(),
                        ptInstance.getTaskInstance().getStartTime(), null, null);
                if (ret <= 0)
                    throw new IllegalStateException("updateTaskInstanceState is error");
            } else if (taskState == TaskState.FINISHED) {
                exceptionDepth = 3;
                if (onlyOne) {
                    schedulingTaskInstanceMap.remove(ptInstance.getTaskId());
                }
                ptInstance.getTaskInstance().setEndTime(System.currentTimeMillis());
                ret = dataService.getInstanceService().updateTaskInstanceState(ptInstance.getInstanceId(),
                        ptInstance.getTaskId(), taskState, 100, ptInstance.getTaskInstance().getStartTime(),
                        ptInstance.getTaskInstance().getEndTime(), null);
                if (ret <= 0) {
                    throw new IllegalStateException("updateTaskInstanceState is error");
                }
                isProcessChild = true;

                if (completedListener != null) {
                    completedListener.onTaskCompleted(ptInstance.getPerformJobGraphInstance().getJobId(),
                            ptInstance.getInstanceId(), ptInstance.getTaskId(),
                            ptInstance.getTaskInstance().getTaskState(), null);
                }
            } else { //失败
                exceptionDepth = 4;
                if (onlyOne) {
                    schedulingTaskInstanceMap.remove(ptInstance.getTaskId());
                }
                ptInstance.getTaskInstance().setEndTime(System.currentTimeMillis());
                if (!onlyOne) {// 判断是否可以自动跳过
                    if (ptInstance.getTaskInstance().isAutoSkip()) {
                        ptInstance.getTaskInstance().setTaskState(TaskState.SKIP);
                        isProcessChild = true;
                    } else {
                        pjInstance.getJobGraphInstance().incFailCount();
                    }
                }
                ret = dataService.getInstanceService().updateTaskInstanceState(ptInstance.getInstanceId(),
                        ptInstance.getTaskId(), taskState, 100, ptInstance.getTaskInstance().getStartTime(),
                        ptInstance.getTaskInstance().getEndTime(),
                        ptInstance.getTaskInstance().getExceptionInfo());
                if (ret <= 0)
                    throw new IllegalStateException("updateTaskInstanceState is error");

                if (completedListener != null) {
                    completedListener.onTaskCompleted(ptInstance.getPerformJobGraphInstance().getJobId(),
                            ptInstance.getInstanceId(), ptInstance.getTaskId(),
                            ptInstance.getTaskInstance().getTaskState(),
                            new Exception(ptInstance.getTaskInstance().getExceptionInfo()));
                }
            }

            // 存储此任务实例输出参数
            if (isProcessChild && ptInstance.getOutputParams().size() > 0) {
                exceptionDepth = 2;
                ret = dataService.getInstanceService().updateTaskInstanceState(ptInstance.getInstanceId(),
                        ptInstance.getTaskId(), ptInstance.getOutputParams().toJson(), false);
                if (ret <= 0)
                    throw new IllegalStateException("updateTaskInstanceState is error");
            }
            //是否要驱动孩子节点
            if (!onlyOne && isProcessChild) {
                runChildTasks(pjInstance, ptInstance, ptInstance.getTaskInstance().getTaskState() == TaskState.SKIP);
            }
        } catch (Exception e) {
            logger.error("processNodeResponse", e);
        }
        return exceptionDepth;
    }

    /**
     * 查看孩子是否可以运行
     *
     * @param pjInstance 任务流程
     * @param ptInstance 任务
     * @param isSkip     是否跳过
     */
    private void runChildTasks(PerformJobGraphInstance pjInstance, PerformTaskInstance ptInstance, boolean isSkip) {
        DataService dataService = ScheduleManager.getInstance().getDataService();
        boolean isCompleted = pjInstance.addCompletedTask(ptInstance, isSkip);
        if (isCompleted) { // 整个流程实例运行完成
            //删除相关映射
            schedulingJobInstanceMap.remove(pjInstance.getJobId());

            try {
                pjInstance.getJobGraphInstance().setJobGraphState(JobGraphState.FINISHED);
                dataService.getInstanceService().updateJobGraphInstance(pjInstance.getInstanceId(),
                        pjInstance.getJobId(), pjInstance.getJobGraphInstance().getJobGraphState(),
                        pjInstance.getJobGraphInstance().getCompletedCount(),
                        pjInstance.getJobGraphInstance().getFailCount(),
                        pjInstance.getJobGraphInstance().getSkipCount(),
                        pjInstance.getJobGraphInstance().getStartTime(),
                        pjInstance.getJobGraphInstance().getEndTime(),
                        100, null);
            } catch (Exception e) {
                logger.error("runChildTasks", e);
            }
        } else {// 看孩子是否可以运行
            // 更新jobGraph的进度
            try {
                dataService.getInstanceService().updateJobGraphInstanceProcess(pjInstance.getInstanceId(),
                        pjInstance.getJobId(), pjInstance.getProgress(),
                        pjInstance.getJobGraphInstance().getCompletedCount(),
                        pjInstance.getJobGraphInstance().getFailCount(),
                        pjInstance.getJobGraphInstance().getSkipCount());
            } catch (Exception e) {
                logger.error("runChildTasks", e);
            }

            if (ptInstance.getChildTaskCount() == 0) return;

            PerformTaskInstance[] childTasks = ptInstance.getChildTask();
            for (PerformTaskInstance inst : childTasks) {
                // 整合到孩子输入参数
                inst.mergeInputParams(ptInstance.getOutputParams());
                // 整合流程实例参数
                inst.mergeInputParams(ptInstance.getPerformJobGraphInstance().getInputParams());
                // 可以运行孩子
                if (inst.incPreTaskCountIfRunChild()) {
                    Future<?> future = taskThreadPool.submit(new TaskInstanceTask(inst, false));
                    inst.setFuture(future);
                }
            }
        }
    }

    /**
     * 创建和调度流程实例执行
     * 这里的任务实例都是新创建的
     */
    class ScheduleTask implements Runnable {
        private final PerformScheduledInstance performScheduledInstance;

        ScheduleTask(PerformScheduledInstance instance) {
            this.performScheduledInstance = instance;
        }

        public void run() {
            boolean f = generatorDAGAndRunning();
            if (f) calcNextTime();
        }

        private boolean generatorDAGAndRunning() {
            PerformJobGraphInstance pJobInstance = null;
            try {
                pJobInstance = createPerformJobGraphInstance();
                JobGraphServiceImpl.this.schedulingJobInstanceMap.put(pJobInstance.getJobId(), pJobInstance);
                loadTaskInstance(pJobInstance);
                runTaskInstance(pJobInstance);
                return true;
            } catch (Exception e) {
                if (pJobInstance != null) {
                    JobGraphServiceImpl.this.schedulingJobInstanceMap.remove(pJobInstance.getJobId());
                    updateJobGraphInstanceCompleted(pJobInstance.getJobGraphInstance(), JobGraphState.FAILED,
                            e.getMessage());
                    if (JobGraphServiceImpl.this.completedListener != null) {
                        JobGraphServiceImpl.this.completedListener.onJobGraphCompleted(pJobInstance.getJobId(),
                                pJobInstance.getInstanceId(), JobGraphState.FAILED, e);
                    }
                }
                return false;
            }
        }

        private PerformJobGraphInstance createPerformJobGraphInstance() throws Exception {
            IdsGenerator idsGenerator = ScheduleManager.getInstance().getIdsGenerator();
            JobGraphInstance jInstance = new JobGraphInstance();
            jInstance.set(performScheduledInstance.getScheduledInstance());
            jInstance.setInstanceId(idsGenerator.generateJobInstanceId());
            jInstance.setJobGraphState(JobGraphState.RUNNING);
            jInstance.setStartTime(System.currentTimeMillis());

            DataInstanceService dataInstanceService = ScheduleManager.getInstance().getDataService().getInstanceService();
            int ret = dataInstanceService.insertJobGraphInstance(jInstance);
            if (ret <= 0)
                throw new Exception("scheduleTask insertJobGraphInstance is error");
            return new PerformJobGraphInstance(jInstance);
        }

        /**
         * 装载任务并转换成任务实例
         * 构建一个DAG图
         *
         * @param pjInstance 任务流程
         * @throws Exception 异常
         */
        private void loadTaskInstance(PerformJobGraphInstance pjInstance) throws Exception {
            DataService dataService = ScheduleManager.getInstance().getDataService();
            List<TaskDefinition> taskDefinitions = dataService.getDefinitionService()
                    .loadTaskDefinition(pjInstance.getJobId());
            if (taskDefinitions == null || taskDefinitions.size() == 0)
                throw new IllegalStateException("not found any taskDefinition");
            List<TaskInstance> taskInstances = convertAndInsert(taskDefinitions);
            if (taskInstances.size() != taskDefinitions.size())
                throw new IllegalStateException("not found any taskInstance");

            builderDAG(pjInstance, taskInstances);
        }

        private List<TaskInstance> convertAndInsert(List<TaskDefinition> taskDefinitions) throws Exception {
            DataService dataService = ScheduleManager.getInstance().getDataService();
            IdsGenerator idsGenerator = ScheduleManager.getInstance().getIdsGenerator();

            List<TaskInstance> taskInstances = new ArrayList<>(taskDefinitions.size());
            for (TaskDefinition def : taskDefinitions) {
                TaskInstance instance = new TaskInstance();
                instance.setInstanceId(idsGenerator.generateJobInstanceId());
                instance.set(def);
                instance.setStartTime(System.currentTimeMillis());
                instance.setTaskState(TaskState.PENDING);
                taskInstances.add(instance);
            }
            int ret = dataService.getInstanceService().insertTaskInstance(taskInstances);
            checkReturn(ret, taskInstances.size(), "insertTaskInstance(list) is error");
            return taskInstances;
        }

        /**
         * 运行任务实例来自新创建的
         *
         * @param pjInstance 流程任务
         */
        private void runTaskInstance(PerformJobGraphInstance pjInstance) {
            PerformTaskInstance[] childTasks = pjInstance.getChildTask();
            for (PerformTaskInstance ptInstance : childTasks) {
                // 整合任务实例输入参数
                ptInstance.mergeInputParams(pjInstance.getInputParams());
                Future<?> future = taskThreadPool.submit(new TaskInstanceTask(ptInstance, false));
                ptInstance.setFuture(future);
            }
        }

        @SuppressWarnings("all")
        private void calcNextTime() {
            JobGraphServiceImpl.this.counters.addCounter(ScheduleCounters.CounterKind.COMPLETED, 1);

            Throwable ex = null;
            PerformScheduledInstance psInstance = this.performScheduledInstance;
            ScheduledInstance sInstance = psInstance.getScheduledInstance();
            Date nextTime = psInstance.calcNextTime(new Date());
            if (nextTime == null) {// 调度结束
                if (JobGraphServiceImpl.this.completedListener != null) {
                    JobGraphServiceImpl.this.completedListener.onScheduledCompleted(sInstance.getJobId(), ex);
                }
            } else { // 再次调度,调度状态应该不需要改变并且在调度队列中
                try {
                    sInstance.setNextScheduleTime(nextTime.getTime());
                    ScheduleTask scheduleTask = new ScheduleTask(psInstance);
                    ScheduledFuture<?> future = JobGraphServiceImpl.this.scheduledService.schedule(scheduleTask,
                            (sInstance.getNextScheduleTime() - System.currentTimeMillis()),
                            TimeUnit.MILLISECONDS);
                    psInstance.setFuture(future);
                } catch (Exception e) {
                    ex = e;
                    logger.error("can not again schedule jobId=" + sInstance.getJobId(), e);
                } finally {
                    if (ex != null) {
                        if (psInstance.getFuture() != null) {
                            psInstance.getFuture().cancel(true);
                        }
                    }
                }
            }
        }

    }

    /**
     * 生成调度任务,监控pendingPerformScheduleInstanceQueue
     * 提交给调度服务运行
     */
    class PendingThread extends Thread {
        private volatile boolean isStop;

        PendingThread() {
            super("JobGraphService-PendingThread");
            this.isStop = false;
        }

        @SuppressWarnings("all")
        @Override
        public void run() {
            while (!this.isStop) {
                Object instance = null;
                try {
                    instance = JobGraphServiceImpl.this.pendingPerformScheduleInstanceQueue.take();
                } catch (Exception e) {
                    // ignore
                }

                if (instance == QUEUE_NULL || this.isStop) {
                    break;
                }
                if (instance == null) {
                    continue;
                }

                Throwable ex = null;
                DataService dataService = ScheduleManager.getInstance().getDataService();
                PerformScheduledInstance pInstance = (PerformScheduledInstance) instance;
                ScheduledInstance sInstance = pInstance.getScheduledInstance();
                boolean isRemove = false;
                try {
                    // 计算下一个调度时间
                    Date nextDate = pInstance.calcNextTime(new Date());
                    sInstance.setNextScheduleTime(nextDate == null ? null : nextDate.getTime());
                    // 这里不应该为空值,为空认为出现异常,调度表达式可能错误
                    if (nextDate != null) {
                        // 创建调度任务并定时执行
                        ScheduleTask scheduleTask = new ScheduleTask(pInstance);
                        ScheduledFuture<?> future = JobGraphServiceImpl.this.scheduledService.schedule(scheduleTask,
                                (sInstance.getNextScheduleTime() - System.currentTimeMillis()),
                                TimeUnit.MILLISECONDS);
                        pInstance.setFuture(future);
                        isRemove = true;

                        //发送同步状态数据到备服务器
                        ScheduleManager.getInstance().getStateSyncService().sendState(StateSyncService
                                .StateDTO.of(StateSyncService.StateDTO.STATE_ADD, pInstance.getJobId()));
                    } else {
                        logger.error("CronExpression getNextDate is null");
                        ex = new IllegalStateException("CronExpression is error");
                    }
                } catch (Exception e) {
                    ex = e;
                } finally {
                    if (ex != null) {
                        logger.error("PendingThread schedule is error", ex);

                        JobGraphServiceImpl.this.counters.addCounter(ScheduleCounters.CounterKind.EXCEPTION, 1);
                        if (isRemove) {
                            if (pInstance.getFuture() != null) {
                                pInstance.getFuture().cancel(true);
                            }
                        }
                        if (JobGraphServiceImpl.this.completedListener != null)
                            JobGraphServiceImpl.this.completedListener.onScheduledCompleted(sInstance.getJobId(), ex);
                    }
                }
            }
        }

        void shutdown() {
            if (this.isStop) return;

            this.isStop = true;
            pendingPerformScheduleInstanceQueue.add(QUEUE_NULL);
        }
    }

    // 定时查询正在运行的任务实例进度
    class WatchThread extends Thread {
        private volatile boolean isStop;
        private final Object lock = new Object();
        private final int interval;

        WatchThread(int interval) {
            super("JobGraphService-WatchThread");
            this.isStop = false;
            this.interval = interval;
        }

        @Override
        public void run() {
            while (!this.isStop) {
                synchronized (lock) {
                    try {
                        lock.wait(this.interval);
                    } catch (Exception e) {
                        // ignore
                    }
                }

                if (this.isStop) break;

                doProcess();
            }
        }

        class InternalInstance {
            PerformTaskInstance ptInstance;
            boolean onlyOne;

            InternalInstance(PerformTaskInstance ptInstance, boolean onlyOne) {
                this.ptInstance = ptInstance;
                this.onlyOne = onlyOne;
            }
        }

        private void doProcess() {
            List<InternalInstance> checkInstance = new LinkedList<>();

            // 1.先看单独运行的任务实例是否需要定期检查了
            Set<Map.Entry<String, PerformTaskInstance>> entrySetPT = schedulingTaskInstanceMap.entrySet();
            try {
                for (Map.Entry<String, PerformTaskInstance> entry : entrySetPT) {
                    PerformTaskInstance ptInstance = entry.getValue();
                    if (ptInstance.getTaskInstance().getInvokeKind() != TaskInvokeHandler.InvokeKind.HTTP)
                        continue;
                    if (ptInstance.getTaskInstance().getTaskState().getValue() > TaskState.RUNNING.getValue())
                        continue;
                    if (ptInstance.isIdleTimeout(interval) && !StringUtil.isEmpty(ptInstance.getNodeIp()))
                        checkInstance.add(new InternalInstance(ptInstance, true));
                }
            } catch (ConcurrentModificationException ex) {
                logger.error("doProcess", ex);
            }

            // 2.再看所有流程实例下的任务实例是否需要定期检查了
            Set<Map.Entry<String, PerformJobGraphInstance>> entrySetPJ = schedulingJobInstanceMap.entrySet();
            try {
                for (Map.Entry<String, PerformJobGraphInstance> entry : entrySetPJ) {
                    PerformJobGraphInstance pjInstance = entry.getValue();
                    PerformTaskInstance[] ptValues = pjInstance.getAllTasks();
                    for (PerformTaskInstance ptInstance : ptValues) {
                        if (ptInstance == null)
                            continue;
                        if (ptInstance.getTaskInstance().getInvokeKind() != TaskInvokeHandler.InvokeKind.HTTP)
                            continue;
                        if (ptInstance.getTaskInstance().getTaskState().getValue() > TaskState.RUNNING.getValue())
                            continue;
                        if (ptInstance.isIdleTimeout(interval) && !StringUtil.isEmpty(ptInstance.getNodeIp()))
                            checkInstance.add(new InternalInstance(ptInstance, false));
                    }
                }
            } catch (ConcurrentModificationException ex) {
                logger.error("doProcess", ex);
            }

            // 3.检查下面的实例
            for (InternalInstance iInstance : checkInstance) {
                // 调用http查询接口
                boolean b;
                try {
                    b = invokeNodeHttp(iInstance.ptInstance);
                } catch (Exception e) {
                    logger.error("doProcess", e);
                    continue;
                }
                // 对结果进行处理
                if (b) {
                    try {
                        processNodeResponse(iInstance.ptInstance, iInstance.onlyOne);
                    } catch (Exception e) {
                        logger.error("processNodeResponse", e);
                    }
                }
            }
        }


        // 返回响应中的HaveScheduleTask
        @SuppressWarnings("all")
        private boolean invokeNodeHttp(PerformTaskInstance ptInstance) throws Exception {
            TaskHttpInvokeHandler<?> handler = (TaskHttpInvokeHandler<?>) getTaskInvokeHandler(TaskInvokeHandler.InvokeKind.HTTP);
            HttpInvokeObject invokeObject = new HttpInvokeObject();
            invokeObject.set(ptInstance.getTaskInstance());
            invokeObject.setInputParam(ptInstance.getInputParams());
            invokeObject.setIp(ptInstance.getNodeIp());
            invokeObject.setPort(ptInstance.getNodePort());

            Map<String, Object> resp = (Map<String, Object>) handler.invokeQuery(invokeObject);
            TaskState state = TaskState.of(ResponseUtil.getState(resp, -1));
            ptInstance.setTimestamp(System.currentTimeMillis());
            ptInstance.getTaskInstance().setTaskState(state);
            ptInstance.getTaskInstance().setProgress(ResponseUtil.getCurProgress(resp, 0));
            ptInstance.getTaskInstance().setExceptionInfo(ResponseUtil.getOutputParams(resp, null));
            ptInstance.getTaskInstance().setOutputParamJson(ResponseUtil.getOutputParams(resp, null));
            ptInstance.mergeOutputParams(ptInstance.getTaskInstance().getOutputParamJson());
            return ResponseUtil.getHaveScheduleTask(resp, true);
        }

        void shutdown() {
            if (this.isStop) return;

            this.isStop = true;
            synchronized (lock) {
                lock.notify();
            }
        }
    }

    private TaskInvokeHandler<?, ?> getTaskInvokeHandler(TaskInvokeHandler.InvokeKind invokeKind) {
        Configuration conf = ScheduleManager.getInstance().getConfiguration();
        String invokeId;
        if (invokeKind == TaskInvokeHandler.InvokeKind.HTTP)
            invokeId = conf.getHttpTaskInvokeHandlerId();
        else if (invokeKind == TaskInvokeHandler.InvokeKind.JAR)
            invokeId = conf.getJarTaskInvokeHandlerId();
        else
            invokeId = conf.getShellTaskInvokeHandlerId();

        TaskInvokeHandler<?, ?> handler;
        if (!StringUtil.isEmpty(invokeId))
            handler = TaskInvokeHandlerFactory.get(invokeKind, invokeId);
        else
            handler = TaskInvokeHandlerFactory.getFirst(invokeKind);
        if (handler == null)
            throw new NullPointerException("TaskHttpInvokeHandler is null");
        return handler;
    }

    private void updateJobGraphInstanceCompleted(JobGraphInstance instance, JobGraphState state,
                                                 String exceptionInfo) {
        try {
            DataInstanceService dataInstanceService = ScheduleManager.getInstance().getDataService().getInstanceService();
            dataInstanceService.updateJobGraphInstance(instance.getInstanceId(), instance.getJobId(),
                    state, instance.getCompletedCount(), instance.getFailCount(), instance.getSkipCount(),
                    instance.getStartTime(), System.currentTimeMillis(), 100, exceptionInfo);
        } catch (Exception e) {
            // ignore
        }
    }

    private void checkMasterServer() {
        if (!ScheduleManager.getInstance().getPlatformHAStrategy().enabledPerform()) {
            throw new IllegalStateException("It must be perform job on master server");
        }
    }

    @SuppressWarnings("all")
    private static void checkReturn(int ret, int value, String msg) throws Exception {
        if (ret < value) throw new Exception(msg);
    }

    private static void checkParams(String... args) {
        for (String s : args) {
            if (StringUtil.isEmpty(s)) throw new NullPointerException(s + "is null");
        }
    }

}
